package com.xueyuan.wata.daph.flink117.computer

import com.xueyuan.wata.daph.api.computer.Computer
import com.xueyuan.wata.daph.flink117.enums.CatalogDatabaseType
import com.xueyuan.wata.daph.flink117.utils.CatalogUtil
import com.xueyuan.wata.daph.utils.JsonUtil.defaultMapper
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.catalog.Catalog
import org.apache.logging.log4j.scala.Logging

import scala.collection.mutable

abstract class FlinkComputer(val computerConfig: FlinkComputerConfig) extends Computer with Logging {
  val tableEnv: TableEnvironment
  private val catalogs = mutable.HashMap[String, Catalog]()

  def this(json: String) = this(
    defaultMapper.readValue(json, classOf[FlinkComputerConfig])
  )

  def globalCatalog(catalogName: String): Catalog = catalogs(catalogName)

  def globalCatalogConfig(catalogName: String): Map[String, String] = computerConfig.catalogConfigs.find(_("name").equals(catalogName)).get

  protected def initGlobalCatalog(): Unit = {
    computerConfig.catalogConfigs.foreach { case catalogConfig =>
      val enabled = catalogConfig("enabled").toBoolean
      if (enabled) {
        val cType = CatalogDatabaseType.withName(catalogConfig("cType"))
        val config = catalogConfig - "cType" - "enabled"
        val catalog = CatalogUtil.createCatalog(cType, config)

        val catalogName = catalogConfig("name")
        catalogs += catalogName -> catalog
      }
    }
  }
}
